Walkthrough 12-3: Schedule a flow and use manual watermarking
In this walkthrough, you continue to work with the accounts table in the training database. You will:
· Use the Scheduler component to create a new flow that executes at a specific frequency.
· Retrieve accounts with a specific postal code from the accounts table.
· Use the Object Store component to store the ID of the latest record and then use it to only retrieve new records.
Starting file
If you did not complete the previous walkthrough, you can get a starting file here. This file is also located in the solutions folder of the student files ZIP located in the Course Resources.
Create a flow that executes at a specific frequency
1. Return to accounts.xml in Anypoint Studio.
2. Locate the Scheduler component in the Core section of the Mule Palette.
3. Drag a Scheduler component from the Mule Palette and drop it at the top of the canvas to create a new flow.
4. Change the name of the flow to syncDBaccountsWithPostal.
5. In the Scheduler properties view, set the frequency to 10 seconds.
Retrieve records with a specific postal code from the database
6. From the Mule Palette, drag a Database Select operation and drop it in the flow.
7. In the Select properties view, set the following:
· Display Name: accounts
· Connector configuration: Database_Config
· SQL query text: SELECT * FROM accounts WHERE postal = :postal
8. In the Input Parameters section, use expression mode to create a postal input parameter with the specific postal code you used in the previous walkthrough.
{ postal: 'yourPostalValue'}
Note: If you want, you can store the postal code as a property in config.yaml and then reference it here in the DataWeave expression as { postal: p('propertyName')}.
Output the records to a CSV file
9. Copy the Write operation in syncDBaccountsToCSV and paste it at the end of syncDBaccountsWithPostal.
10. In the properties view for the Write operation, set the following values:
· Display Name: DBaccountsPostal.csv
· Path: output/DBaccountsPostal.csv
11. Change the content to output the payload as application/csv with a header property equal to false.
output application/csv header=false --- payload
Log the payload
12. Add a Logger at the end of the flow and change the display name to CSV payload.
13. Set the message to the payload as type application/csv.
Stop the syncDBaccountsToCSV flow so it does not run
14. In the properties view for the syncDBaccountsToCSV flow, set the initial state to stopped.
Test the application
15. Save the file and run the project.
16. In the Clear Application Data dialog box, click Yes.
17. Watch the console, you should see the same records displayed every 10 seconds.
Note: Right now, all records with matching postal code are retrieved – over and over again. Next, you will modify this so only new records with the matching postal code are retrieved.
18. Stop the project.
19. Return to the resources folder in your computer’s file browser; you should see the new DBaccountsPostal.csv file in the output folder.
20. Open the file and review the contents.
21. Delete the file.
Add the ObjectStore module to the project
22. In the Mule Palette, select Add Modules.
23. Select the ObjectStore connector in the right side of the Mule Palette and drag and drop it into the left side.
24. If you get a Select module version dialog box, select the latest version and click Add.
Store the ID of the last record retrieved
25. Drag the Store operation for the ObjectStore connector from the Mule Palette and drop it after the Database Select operation.
26. In the Store properties view, set the display name and key to lastAccountID.
27. Set the value to an expression for the maximum value of lastAccountID in the payload containing the retrieved records.
max(payload.*accountID)
Before the query, retrieve the ID of the last record retrieved
28. Drag the Retrieve operation for the ObjectStore connector from the Mule Palette and drop it before the Database Select operation.
29. In the Retrieve properties view, set the following values:
· Display Name: lastAccountID
· Key: lastAccountID
· Default value: 0
30. Select the Advanced tab and set the target variable to lastAccountID so the value is stored in a variable called lastAccountID.
Modify the query to only retrieve new records with a specific postal code
31. In the accounts Select properties view, add a second query input parameter called lastAccountID that is equal to the watermark value.
{postal: 'yourValue', lastAccountID: vars.lastAccountID}
32. Modify the SQL query text to use this parameter to only retrieve new records.
SELECT * FROM accounts WHERE postal = :postal AND accountID > :lastAccountID
Only store the value if new records were retrieved
33. Add a Choice router after the Select operation.
34. Move the Store, Write, and Logger operations into the when branch of the router.
35. Add a Logger to the default branch.
36. In the default branch Logger properties view, set the display name and message to No new records.
37. In the Choice router, click the when scope and in the properties view, add an expression to route the event when the payload is not empty.
#[not isEmpty(payload)]
Clear the application data and debug the application
38. Add a breakpoint to the Retrieve operation.
39. Save the file and debug the project.
40. In the Clear Application Data dialog box, click Yes.
41. In the Mule Debugger, step to the Select operation; you should see a lastAccountID variable with a value of 0.
42. Step into the Choice router; you should see record(s) were retrieved from the database.
43. Click resume; the flow should be executed again, and execution should be stopped back at the beginning of the flow.
44. Step to the Select operation; you should see the lastAccountID variable now has a non-zero value.
45. Step into the Choice router; you should see no records were retrieved from the database (unless someone else just added one with the same postal code!).
46. Stop the project and switch perspectives.
47. Return to your computer’s file explorer and locate and open the new DBaccountsPostal.csv file.
Note: If you see the same records more than once, it was because during debugging, the flow was executed again before the watermark was stored.
48. Close the DBaccountsPostal.csv file.
Debug the application and do not clear application data
49. Return to Anypoint Studio and debug the project.
50. In the Clear Application Data dialog box, click No.
51. In the Mule Debugger, step to the Select operation; you should see a lastAccountID variable with the same non-zero value.
52. Step into the Choice router; no new records should have been returned.
53. Remove the breakpoint from the Retrieve operation.
54. Make sure there are no other breakpoints in the flow then click Resume to continue application execution.
Note: If your application still halts at the Retrieve operation, stop then debug the project without clearing the application data.
Add a new account to the database
55. Return to the MUA Accounts page in the web browser.
56. Click Add Account.
57. Fill out the form with data and use the same postal code.
Note: Do not use quotes, especially in the name field.
58. Click Save.
59. You should see your new account on the MUA accounts page.
60. Return to Anypoint Studio; you should see your new record in the console.
61. Return to your computer’s file explorer; the modified date on the DBAccountsPostal.csv file should have been updated.
62. Open DBaccountsPostal.csv and locate your new record at the end of the file.
63. Close the DBaccountsPostal.csv file.
64. Return to Anypoint Studio and switch perspectives.
65. Stop the project.